Vertex AI Pipelines
DSL 書き方・使い方は Kubeflow Pipelines へ
テンプレート ギャラリーの事前作成済みテンプレートを使用する  |  Vertex AI  |  Google Cloud
テンプレートギャラリー Google Cloud コンソール
Vertex AI Pipelines Jupyter ノートブック  |  Google Cloud
Google Cloud での傾向モデリングに Kubeflow Pipelines を使用する  |  Cloud アーキテクチャ センター
Vertex AI Pipelinesのチュートリアル | Hakky Handbook
Vertex AI Pipelinesを用いて爆速ML開発の仕組みを構築する LayerXテックアドカレ - LayerX エンジニアブログ
自己流 Vertex AI Pipelines 開発プラクティス|Tatsuya Shirakawa
Vertex AIではじめるKubeflow Pipelines | DevelopersIO
kfp-project/pipelines/optuna-pipeline at main · tonouchi510/kfp-project 具体例
Kubeflow Pipelines v2 で Pipeline の書き方がかなり変わる件について
KFP SDK API Reference
Kubeflow Pipelines — Some Useful Tips | by Revathi Prakash | Medium
コードを実行したらコンパイルして Vertex AI Pipelines に投入する
手元から pipeline を投入するときに
code:run.py
if __name__ == "__main__":
# 邪魔じゃないなら単にこれでも
# dest = __file__.replace(".py", ".yaml")
import os
import tempfile
filename = os.path.basename(__file__).replace(".py", ".yaml")
dest = os.path.join(tempfile.mkdtemp(), filename)
compiler = kfp.compiler.Compiler()
compiler.compile(pipeline_func=pipeline, package_path=dest)
print(f"Generated pipeline: {dest}")
import google.cloud.aiplatform as aip
aip.init(project=project, location=location)
job = aip.PipelineJob(
display_name="sample",
template_path=dest,
)
job.submit(service_account="...")
マシンスペックを指定する
pipeline 側で component ごとに設定、PipelineTask に対する set_ 系メソッドで設定する (TaskConfiguration)
foo_task().set_env_variable("FOO", "bar") 的なやつ
code:task_configurations.py
.add_accelerator_type
.set_accelerator_limit
.set_cpu_limit
.set_memory_request # Vertex AI Pipelines はサポートしてない
.set_memory_limit
.set_env_variable
.set_caching_options
.set_display_name
.set_retry
.ignore_upstream_failure
パイプライン ステップのマシン構成を指定する  |  Vertex AI  |  Google Cloud
デフォルトは e2-standard-4 (4 cpu 16GB mem)
ただし GPU を付けると n1-highmem-2 (2 cpu 13GB mem) になる、e2 は GPU 付けれないため n1 系になる
カスタム トレーニング用のコンピューティング リソースを構成する  |  Vertex AI  |  Google Cloud
使いそうなやつ
料金  |  Vertex AI  |  Google Cloud
注 2024/6 のコストなのでもう変わっているかも
e2-standard-4 GPU なし, デフォルト
.set_cpu_limit("4").set_memory_limit("16G")
e2-highmem-2 GPU なし最安 ($0.133257/h in Tokyo) これ以下を指定する意味ない
.set_cpu_limit("2").set_memory_limit("16G")
n1-highmem-2 GPU 可で最安だが ログでないので使わない
.set_cpu_limit("2").set_memory_limit("13G")`
n1-standard-4 GPU 可の実用最安 ($0.280600/h in Tokyo)
.set_cpu_limit("4").set_memory_limit("15G")
n1-highmem-4 GPU 可
.set_cpu_limit("4").set_memory_limit("26G")
n1-standard-8 GPU 可
.set_cpu_limit("8").set_memory_limit("30G")`
GPU 指定
PipelineTask.add_node_selector_constraint() takes 2 positional arguments but 3 were given
あれ、最近の kfp (2.7)は key いらないらしい
print_torch().add_node_selector_constraint("NVIDIA_TESLA_T4").set_accelerator_limit(1)
kfp.dsl - KFP SDK API Reference
python - Vertex AI pipelines TypeError: PipelineTask.add_node_selector_constraint() takes 2 positional arguments but 3 were given - Stack Overflow
GPU 付けたら起動までにめちゃめちゃ時間がかかる、10分~ 程度
リージョンやマシンタイプと組み合わせられる GPU に制限がある
Compute Engine インスタンスタイプ
アクセラレータの使用 - Vertex AI のロケーション  |  Google Cloud リージョンと使える GPU の対応
各マシンタイプに有効な GPU の数 - カスタム トレーニング用のコンピューティング リソースを構成する  |  Vertex AI  |  Google Cloud
観測上 e2-highmem-2 未満のリソースを要求してもそれより小さいインスタンスは割り当てられない
低スペで GPU つけるとログ出ない
もっと目立つところに書いておけ
GPU - カスタム トレーニング用のコンピューティング リソースを構成する  |  Vertex AI  |  Google Cloud
GPU で n1-highmem-2 などの小型のマシンタイプを使用すると、CPU の制約により、一部のワークロードでロギングが失敗する可能性があります。トレーニング ジョブがログを返さなくなった場合は、より大きなマシンタイプの選択を検討してください。
可能性がありますだけど、経験上 n1-highmem-2 で GPU 乗せるとコンポーネント内のログやエラーが残ったことない、デバッグにめちゃめちゃ苦労するので設定するのがよい
Compute Engine インスタンスタイプ からちょうどいいの選ぶとして、安めこのあたりかなあ
code:py
# n1-highmem-4 ($0.348910/h in Tokyo)
comp1()
.set_cpu_limit("4")
.set_memory_limit("26G")
.add_node_selector_constraint("NVIDIA_TESLA_T4")
.set_accelerator_limit(1)
# n1-standard-4 ($0.280600/h in Tokyo)
comp2()
.set_cpu_limit("4")
.set_memory_limit("15G")
.add_node_selector_constraint("NVIDIA_TESLA_T4")
.set_accelerator_limit(1)
Artifact の扱い
Artifacts | Kubeflow
基本的に返した Artifact は Vertex ML Metadata に記録される
Output[] をコンポーネントの引数で受けたり、Artifactを返すと残る、じゃんじゃか増えていく
metadata をうまくつけておくと検索して使えるので便利
output.path に書き込むと対応する Cloud Storage に書き込まれ、output.uri に gs://... が入ってる
Output[HTML], Output[Markdown] 等はコンソールで内容を表示できる
Output HTML と Output Markdown  |  Vertex AI  |  Google Cloud
Artifact の名前を Output 以外にする
Traditional artifact syntax (component の引数で Output[Artifact] を受ける方)は引数名が Vertex ML Metadata 上の名前になる
Pythonic artifact syntax で名前を指定する方法はない?
Cloud Storage に書き込む & Metadata 残す
独自型を定義してもよい、追加で出せる内容が増える
カスタム スキーマを作成して使用する  |  Vertex AI  |  Google Cloud
output = dsl.Dataset(uri=dsl.get_uri())
output.metadata = {...} でメタデータ残せる
dsl.get_uri(suffix='sample.txt') でファイル名決める
1つのコンポーネントから複数の output を返す時は実質必須、デフォルト suffix="Output" なので同じ書き込み先
output.display_name='...' で表示名
name は Google Cloud リソースの name 参照 (projects/.../locations/... 的な) なので参照時にしか使わない
デフォルトで /gcs/{bucket_name} にアクセスすると読み出せる
Artifact の uri の先頭 gs:// を置換して読んでもよい
input.uri.replace("gs://", "/gcs/")
code:gcs_components.py
@dsl.component(base_image="python:3.11")
def save_artifact() -> dsl.Dataset:
out = dsl.Dataset(uri=dsl.get_uri(suffix='sample.txt')) # dsl.get_uri() で保存先を生成
with open(out.path, "w") as f: # Dataset に対応するローカルパスに書き込み
f.write("sample")
return out # Dataset 返す
@dsl.component(base_image="python:3.11")
def read_artifact(input: dsl.Dataset):
# gs://... が設定されているので GCS FUSE で読めるパスへ
# そのまま Cloud Storage Client などで読んでもよい
path = input.uri.replace("gs://", "/gcs/")
with open(path, "r") as f: # 開いて読む
print(f.readlines())
pass
@dsl.pipeline()
def pipeline():
save_artifact_task = save_artifact()
read_artifact(input=save_artifact_task.output)
Pipeline を Experiment と紐づける
code:experiment_linking.py
job = aip.PipelineJob(
template_path=dest,
parameter_values=parameter_values,
...
)
job.submit(
service_account=service_account,
experiment=experiment, # ここで名前つける
)
メトリックを記録する
コンポーネントで Output[Metrics] を引数に取り log_metrics(key, value) を呼ぶ
pipeline の artifact として残る
加えて pipelinejob.submit(experiment="...") すると、 Vertex AI Experiments の Experiment が作成されメトリックが残る
Experiment に対しパイプラインが Experiment Run として紐づく
メトリックはパイプライン単位で残る、同じキーで書いてみても一応両方見れるが良くはなさそう
code:battle.py
@dsl.component(base_image="python:3.11")
def log_metric_a(metrics: dsl.Outputdsl.Metrics):
metrics.log_metric("from", "a")
metrics.log_metric("a", 123)
@dsl.component(base_image="python:3.11")
def log_metric_b(metrics: dsl.Outputdsl.Metrics):
metrics.log_metric("from", "b")
metrics.log_metric("b", 456)
https://gyazo.com/d88d0107306075ca7191dbd9cfff7ac4 https://gyazo.com/3088f7bcd1ba8d4befe9efbe8836810b
複数のコンポーネントで別々のメトリックを書きたいなら自分で google.cloud.aiplatform 呼ぶしかないかな
kfp は metrics.log_metric(key, value) に対し
ExperimentRun は run.log_metrics({ key: value }) である
run_pipeline と同じ名前なら同じ Experiment に紐づく
パイプライン実行とテスト実行が同じテーブルなの微妙に嫌だけど、カラムあるってことは混ぜて使う想定かねえ
繰り返し実行で名前がかぶるので
被ってたら resume で上書きする
code:resume.py
name = "use-aiplatform-in-component"
resume = aiplatform.ExperimentRun.get(name) is not None
with aiplatform.start_run(name, resume=resume) as run:
run.log_metrics(...)
別々の ID を振る
pipeline からメタデータ変数を component に渡す Kubeflow Pipelines#666d40f738446100009320d5
task_id=dsl.PIPELINE_TASK_ID_PLACEHOLDER
code:task_id.py
name = f"use-aiplatform-in-component-{task_id}"
with aiplatform.start_run(name) as run:
run.log_metrics(...)
テスト実行にデータを自動的に記録する  |  Vertex AI  |  Google Cloud
テスト実行にデータを手動で記録する  |  Vertex AI  |  Google Cloud
パイプライン実行リージョンと Exeperiment のリージョン別にする
うーん、run_pipeline ではパイプライン実行リージョンを指定し、expeirment は指定しない(してもいいが)
kfp の metrics は使わず aiplatform で自分で送る、かなあ
GCS FUSE
Cloud Storage をマウント ファイル システムとして使用する  |  Vertex AI  |  Google Cloud
Cloud Storage FUSE を使用した Cloud Storage ファイルの読み取りと書き込み - トレーニング コードを準備する  |  Vertex AI  |  Google Cloud
トレーニング コードを準備する  |  Vertex AI  |  Google Cloud
pipeline_root と関係なく gs://BUCKET_NAME/... にアクセスして権限があれば読み書きできる
仮想的に世界の全ての bucket がマウントされているような雰囲気、アクセスした時に権限がなければ弾かれる
impersonate で読み書きしたいが方法はなさそう
Secret Manager 使う
Secret Manager でシークレットを構成する  |  Vertex AI  |  Google Cloud
@dsl.component(..., packages_to_install=['google-cloud-secret-manager']) して普通に client 作って参照する
このサンプルの読み出すコンポーネントから受け渡したら Cloud Storage に残って良くないのでは?
pipeline 引数
def pipeline(foo: int, bar: int): ... して
aiplatform.PipelineJob(parameter_values={foo: 123, bar: 456}) のようにジョブ設定で渡す
キャッシュはデフォルト有効
実行キャッシュの構成  |  Vertex AI  |  Google Cloud
らしいので 明に切る...と思ったけど実際あると便利
切る方法
foo_task(...).set_caching_options(False) コンポーネント単位で無効
aiplatform.PipelineJob(..., enable_caching=False) パイプライン単位で無効(Job投入前の設定)
パイプライン名、入出力、コンポーネントの外形的な仕様がキャッシュキーになる
パイプライン コンポーネントは決定論的に構築する必要があります。特定の入力セットでは、常に同じ出力が生成される必要があります
なのでコンポーネント内の実装いじって実行してもキャッシュのままなことがある
またコンポーネント実行で失敗していてもキャッシュ使い回される? ぽいので、修正しているつもりができてないことがある
コンポーネントとして呼ばれたパイプラインのキャッシュはどうなる?
共有の使い回しパイプライン
パイプラインA → 使い回しパイプライン
パイプラインB → 使い回しパイプライン
経験的にはされてなさそう(ちゃんとみてない)
親となる呼び出し元の名前が変わったらキャッシュは共有されない、まあそうか
キャッシュが効いていても出力するパイプライン実行には Artifact が追加される
助かる挙動
1. パイプライン実行1で Aritfact A を生成
2. パイプラインを修正
3. パイプライン実行2 で
Artifact A の生成はキャッシュが効いて処理をスキップ
Aritfact B を新たに生成
としたときに、A は 実行1, 実行2 両方のコンテキストから引ける、B は 実行2 からのみ
👉 Vertex ML Metadata#6667aa9b384461000069867c
実行サービスアカウント
Failed to create pipeline job. Error: Vertex AI Service Agent ... does not have permission to access Artifact Registry repository
デフォルトでは Vertex AI Service Agent のサービスアカウントで動作する?
なのでカスタムイメージ pull するのに Artifact Registry の権限必要?
カスタム サービス アカウントを構成する - カスタム サービス アカウントを使用する  |  Vertex AI  |  Google Cloud
Artifact Registry からイメージを pull するようにカスタム サービス アカウントを構成することはできません。Vertex AI は、デフォルトのサービス アカウントを使用してイメージを pull します。
このデフォルトは Service Agent なのか、Compute Default なのかどっちだ
大抵実行用サービスアカウントを作って使うだろうし
job.submit(service_account=...) で指定したユーザの権限で ArtifactRegistry pull している
Cloud Monitoring 指標
Google Cloud metrics  |  Cloud Monitoring
ml.googleapis.com/ prefix のもの
Cloud ML Job
失敗したコンポーネントが1つあれば全体を失敗させる
障害ポリシーを構成する  |  Vertex AI  |  Google Cloud
フェイルファストにする
aiplatform.PipelineJob(..., failure_policy='fast') デフォルトは slow
Google Cloud 用コンポーネント
Google Cloud Pipeline Components Reference Documentation
Google Cloud Artifact Types - Google Cloud Pipeline Components Reference Documentation
pipeline_root
省略するとデフォルトで以下が root になる
gs://{PROJECT_ID}-vertex-pipelines-{REGION}/output_artifacts/
各パイプラインが実行されるのは
{PIPELINE_ROOT}/{実行PROJECT_NUMBER?}/{PIPELINE_JOB_NAME}
PIPELINE_JOB_NAME は dsl.PIPELINE_JOB_NAME_PLACEHOLDER
既に存在する Artifact を返す
Vertex AI Pipeline で既に存在する Artifact を返す
1つの func を別々の base_image を指定したい
コードコピペしたくねえが hermetic に(component の中に一通り)書くので使いまわしづらい...と思いきや
ここは普通の Python としてデコレータを単に呼べば良い
code:call_decorator.py
image_v1 = 'asia-northeast1-docker.pkg.dev/...'
image_v2 = 'asia-northeast1-docker.pkg.dev/...'
def my_component(...) -> dsl.Artifact:
...
foo_v1 = dsl.component(base_image=image_v1, func=my_component)
foo_v2 = dsl.component(base_image=image_v2, func=my_component)
@dsl.pipeline
def pipeline():
foo_v1_task = foo_v1(...)
foo_v2_task = foo_v2(...)
compare(v1=foo_v1_task.output, v2=foo_v2_task.output)
#GoogleCloud